#!/usr/bin/env python

# Copyright (c) 2013, Cornell University
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#     * Redistributions of source code must retain the above copyright notice,
#       this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of HyperDex nor the names of its contributors may be
#       used to endorse or promote products derived from this software without
#       specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import collections
import datetime
import functools
import simplejson
import struct
import threading
import time

from flask import Flask
from flask import g
from flask import request
from flask import Response
import scipy
from werkzeug.contrib.cache import SimpleCache

import hyperdex.admin


AGGREGATE='aggregate'
INSTANT='instant'

Property = collections.namedtuple('Property', ['tag', 'category', 'name', 'form', 'units'])
properties = [
    Property(tag='io.in_flight', category='I/O', name='I/Os In-Flight', form=INSTANT, units='requests'),
    Property(tag='io.io_ticks', category='I/O', name='Time Active', form=AGGREGATE, units='milliseconds'),
    Property(tag='io.read_bytes', category='I/O', name='Number of Bytes Read', form=AGGREGATE, units='bytes'),
    Property(tag='io.read_ios', category='I/O', name='Number of Read I/Os Performed', form=AGGREGATE, units='requests'),
    Property(tag='io.read_merges', category='I/O', name='Number of Read I/Os Merged In-Queue', form=AGGREGATE, units='requests'),
    Property(tag='io.read_ticks', category='I/O', name='Time Spent on Read Requests', form=AGGREGATE, units='milliseconds'),
    Property(tag='io.time_in_queue', category='I/O', name='Time Spent on All Requests', form=AGGREGATE, units='milliseconds'),
    Property(tag='io.write_bytes', category='I/O', name='Number of Bytes Written', form=AGGREGATE, units='bytes'),
    Property(tag='io.write_ios', category='I/O', name='Number of Write I/Os Performed', form=AGGREGATE, units='requests'),
    Property(tag='io.write_merges', category='I/O', name='Number of Write I/Os Merged In-Queue', form=AGGREGATE, units='requests'),
    Property(tag='io.write_ticks', category='I/O', name='Time Spent on Write Requests', form=AGGREGATE, units='milliseconds'),
    Property(tag='leveldb.files0', category='LevelDB', name='L0 Files', form=INSTANT, units='files'),
    Property(tag='leveldb.files1', category='LevelDB', name='L1 Files', form=INSTANT, units='files'),
    Property(tag='leveldb.files2', category='LevelDB', name='L2 Files', form=INSTANT, units='files'),
    Property(tag='leveldb.files3', category='LevelDB', name='L3 Files', form=INSTANT, units='files'),
    Property(tag='leveldb.files4', category='LevelDB', name='L4 Files', form=INSTANT, units='files'),
    Property(tag='leveldb.files5', category='LevelDB', name='L5 Files', form=INSTANT, units='files'),
    Property(tag='leveldb.files6', category='LevelDB', name='L6 Files', form=INSTANT, units='files'),
    Property(tag='leveldb.read0', category='LevelDB', name='L0 Bytes Read', form=AGGREGATE, units='bytes'),
    Property(tag='leveldb.read1', category='LevelDB', name='L1 Bytes Read', form=AGGREGATE, units='bytes'),
    Property(tag='leveldb.read2', category='LevelDB', name='L2 Bytes Read', form=AGGREGATE, units='bytes'),
    Property(tag='leveldb.read3', category='LevelDB', name='L3 Bytes Read', form=AGGREGATE, units='bytes'),
    Property(tag='leveldb.read4', category='LevelDB', name='L4 Bytes Read', form=AGGREGATE, units='bytes'),
    Property(tag='leveldb.read5', category='LevelDB', name='L5 Bytes Read', form=AGGREGATE, units='bytes'),
    Property(tag='leveldb.size0', category='LevelDB', name='L0 Size', form=INSTANT, units='bytes'),
    Property(tag='leveldb.size1', category='LevelDB', name='L1 Size', form=INSTANT, units='bytes'),
    Property(tag='leveldb.size2', category='LevelDB', name='L2 Size', form=INSTANT, units='bytes'),
    Property(tag='leveldb.size3', category='LevelDB', name='L3 Size', form=INSTANT, units='bytes'),
    Property(tag='leveldb.size4', category='LevelDB', name='L4 Size', form=INSTANT, units='bytes'),
    Property(tag='leveldb.size5', category='LevelDB', name='L5 Size', form=INSTANT, units='bytes'),
    Property(tag='leveldb.time0', category='LevelDB', name='L0 Compaction Time', form=AGGREGATE, units='seconds'),
    Property(tag='leveldb.time1', category='LevelDB', name='L1 Compaction Time', form=AGGREGATE, units='seconds'),
    Property(tag='leveldb.time2', category='LevelDB', name='L2 Compaction Time', form=AGGREGATE, units='seconds'),
    Property(tag='leveldb.time3', category='LevelDB', name='L3 Compaction Time', form=AGGREGATE, units='seconds'),
    Property(tag='leveldb.time4', category='LevelDB', name='L4 Compaction Time', form=AGGREGATE, units='seconds'),
    Property(tag='leveldb.time5', category='LevelDB', name='L5 Compaction Time', form=AGGREGATE, units='seconds'),
    Property(tag='leveldb.write0', category='LevelDB', name='L0 Bytes Written', form=AGGREGATE, units='bytes'),
    Property(tag='leveldb.write1', category='LevelDB', name='L1 Bytes Written', form=AGGREGATE, units='bytes'),
    Property(tag='leveldb.write2', category='LevelDB', name='L2 Bytes Written', form=AGGREGATE, units='bytes'),
    Property(tag='leveldb.write3', category='LevelDB', name='L3 Bytes Written', form=AGGREGATE, units='bytes'),
    Property(tag='leveldb.write4', category='LevelDB', name='L4 Bytes Written', form=AGGREGATE, units='bytes'),
    Property(tag='leveldb.write5', category='LevelDB', name='L5 Bytes Written', form=AGGREGATE, units='bytes'),
    Property(tag='msgs.chain_ack', category='Messages', name='Chain Acknowledgment', form=AGGREGATE, units='requests'),
    Property(tag='msgs.chain_gc', category='Messages', name='Chain Garbage Collect', form=AGGREGATE, units='requests'),
    Property(tag='msgs.chain_op', category='Messages', name='Chain Operation', form=AGGREGATE, units='requests'),
    Property(tag='msgs.chain_subspace', category='Messages', name='Chain Subspace', form=AGGREGATE, units='requests'),
    Property(tag='msgs.perf_counters', category='Messages', name='Perf Counters', form=AGGREGATE, units='requests'),
    Property(tag='msgs.req_atomic', category='Messages', name='Request Atomic', form=AGGREGATE, units='requests'),
    Property(tag='msgs.req_count', category='Messages', name='Request Count', form=AGGREGATE, units='requests'),
    Property(tag='msgs.req_get', category='Messages', name='Request Get', form=AGGREGATE, units='requests'),
    Property(tag='msgs.req_group_del', category='Messages', name='Request Group Del', form=AGGREGATE, units='requests'),
    Property(tag='msgs.req_search_describe', category='Messages', name='Request Search Describe', form=AGGREGATE, units='requests'),
    Property(tag='msgs.req_search_next', category='Messages', name='Request Search Next', form=AGGREGATE, units='requests'),
    Property(tag='msgs.req_search_start', category='Messages', name='Request Search Start', form=AGGREGATE, units='requests'),
    Property(tag='msgs.req_search_stop', category='Messages', name='Request Search Stop', form=AGGREGATE, units='requests'),
    Property(tag='msgs.req_sorted_search', category='Messages', name='Request Sorted Search', form=AGGREGATE, units='requests'),
    Property(tag='msgs.xfer_ack', category='Messages', name='Transfer Acknowledgement', form=AGGREGATE, units='requests'),
    Property(tag='msgs.xfer_op', category='Messages', name='Transfer Operation', form=AGGREGATE, units='requests'),
    None][:-1] # slicing done to enable all lines to end with comma
properties_by_tag = dict([(p.tag, p) for p in properties])


SEP = ':'


def roundup(x, y):
    return ((x + y - 1) / y) * y;


class PerformanceCounterThread(threading.Thread):

    def __init__(self, db, coordinator, port):
        threading.Thread.__init__(self)
        self.db = db
        self.coordinator = coordinator
        self.port = port

    def run(self):
        self.admin = hyperdex.admin.Admin(self.coordinator, self.port)
        pc = self.admin.enable_perf_counters()
        for x in pc:
            try:
                k  = x['property'].replace(SEP, chr(ord(SEP) + 1))
                k += SEP + struct.pack('>QQ', x['time'] / 1000000, x['server'])
                v = struct.pack('>Q', x['measurement'])
                db.Put(k, v)
            except struct.error:
                pass


class ClusterMonitorThread(threading.Thread):

    def __init__(self, db, coordinator, port):
        threading.Thread.__init__(self)
        self.db = db
        self.coordinator = coordinator
        self.port = port

    def run(self):
        self.admin = hyperdex.admin.Admin(self.coordinator, self.port)
        version = 0
        while True:
            config = self.admin.dump_config()
            if config['version'] > version:
                db.Put('config', simplejson.dumps(config))
                version = config['version']
            time.sleep(0.25)


class LevelDBGraphGenerator(object):

    def __init__(self, db):
        self.db = db
        self.cache = SimpleCache()

    def get_data(self, now, prop, group, duration, interval):
        timeline = self.get_timeline(now, duration, interval)
        assert prop.form in (AGGREGATE, INSTANT)
        if prop.form == AGGREGATE:
            data = self.get_data_aggregate(prop, timeline, interval)
        if prop.form == INSTANT:
            data = self.get_data_instant(prop, timeline)
        return [(t, group(points, interval)) for t, points in data]

    def get_data_aggregate(self, prop, timeline, interval):
        data = []
        for i, t in enumerate(timeline):
            t = long(t)
            points = {}
            start_time = roundup(t - long(interval / 2), 100)
            limit_time = roundup(t + long(interval / 2), 100)
            start = self.get_raw_data_points(prop, start_time)
            limit = self.get_raw_data_points(prop, limit_time)
            servers = set(start.keys()) | set(limit.keys())
            for s in servers:
                if s in start and s in limit:
                    points[s] = limit[s] - start[s]
                else:
                    points[s] = 0
            data.append((t, points))
        return data

    def get_data_instant(self, prop, timeline):
        data = []
        for t in timeline:
            data.append((t, self.get_raw_data_points(prop, t)))
        servers = set(sum([y.keys() for x, y in data], []))
        interpolated_data = []
        for t, points in data:
            points = points.copy()
            for s in servers:
                if s not in points:
                    points[s] = 0
            interpolated_data.append((t, points))
        return interpolated_data

    def get_timeline(self, now, duration, interval):
        now -= interval
        now = now - (now % interval)
        return sorted([now - x for x in range(0, duration * interval, interval)])

    def get_raw_data_points(self, prop, point):
        start = prop.tag + SEP + struct.pack('>Q', point)
        limit = prop.tag + SEP + struct.pack('>Q', point + 1)
        cached = self.cache.get(('p', start, limit))
        if cached is not None:
            return cached
        it = self.db.RangeIter(start, limit)
        data = {}
        for k, v in it:
            if not k.startswith(start):
                continue
            data[struct.unpack('>Q', k[len(start):])[0]] = struct.unpack('>Q', v)[0]
        self.cache.set(('p', start, limit), data, timeout=1)
        return data

    def group_func_sum_aggregate(self, points, interval, opts=None):
        return sum(points.values()) * 1000. / interval

    def group_func_avg_aggregate(self, points, interval, opts=None):
        return scipy.mean(points.values()) * 1000. / interval

    def group_func_server_aggregate(self, points, interval, opts=None):
        if isinstance(opts, dict) and 'id' in opts:
            sid = as_long(opts, 'id', 0)
            if sid in points:
                return points[sid] * 1000. / interval
        return 0

    def group_func_sum_instant(self, points, interval, opts=None):
        return sum(points.values())

    def group_func_avg_instant(self, points, interval, opts=None):
        return scipy.mean(points.values())

    def group_func_server_instant(self, points, interval, opts=None):
        if isinstance(opts, dict) and 'id' in opts:
            sid = as_long(opts, 'id', 0)
            if sid in points:
                return points[sid]
        return 0


app = Flask(__name__)


def as_long(args, value, default):
    try:
        return long(args.get(value, None))
    except ValueError:
        return default


def as_string(args, value, default):
    try:
        x = args.get(value)
        if x is None:
            return default
        return str(x)
    except ValueError:
        return default


def as_enum(args, value, default, choices):
    x = as_string(args, value, default)
    if x not in choices:
        return default
    return x


def respond_jsonp(x, headers=None):
    cb = request.args.get("callback")
    j = simplejson.dumps(x)
    resp = Response('{0}('.format(cb) + j + ');',
                    mimetype='application/json',
                    headers=headers)
    if headers:
        for k, v in headers.iteritems():
            resp.headers.add(k, v)
    return resp


@app.route('/properties.json')
def view_properties():
    ps = [{'tag': p.tag,
           'category': p.category,
           'name': p.name,
           'form': p.form,
           'units': p.units} for p in properties];
    return respond_jsonp(ps);


@app.route('/config.json')
def view_config():
    conf = db.Get('config')
    conf = simplejson.loads(conf)
    for s in conf['servers']:
        s['id'] = str(s['id']).strip('L')
    return respond_jsonp(conf)


@app.route('/chart.json')
def view_chart():
    now = int(time.time()) * 1000 # most recent second in milliseconds
    args = simplejson.loads(request.args.get('args'))
    duration = as_long(args, 'duration', 60)
    interval = as_long(args, 'interval', 1000)
    table = []
    for desc in args.get('data', []):
        props = [properties_by_tag[p] for p in desc['props']]
        if len(set([p.units for p in props])) > 1:
            raise RuntimeError("Mismatched properties")
        data = []
        for p in props:
            group = as_enum(desc, 'group', 'sum', ['sum', 'avg', 'server'])
            group = getattr(lgg, 'group_func_' + group + '_' + p.form)
            group_opts = desc.get('group_opts')
            if group_opts is not None:
                group = functools.partial(group, opts=group_opts);
            data.append(lgg.get_data(now, p, group, duration, interval))
        if not data:
            continue
        if not table:
            def to_time(x):
                return datetime.datetime.fromtimestamp(x / 1000.).strftime('%Y-%m-%dT%H:%M:%S')
            table.append(['Time'] + [to_time(x[0]) for x in data[0]])
        table.append([as_string(desc, 'label', 'Label')] +
                     [sum(y) for y in zip(*[[x[1] for x in d] for d in data])])
    table = zip(*table)
    return respond_jsonp(table)


import leveldb
import sys

HOST = sys.argv[1] if len(sys.argv) > 1 else '127.0.0.1'
PORT = int(sys.argv[2]) if len(sys.argv) > 2 else 1982

db = leveldb.LevelDB('./stats')
pct = PerformanceCounterThread(db, HOST, PORT)
pct.start()
cmt = ClusterMonitorThread(db, HOST, PORT)
cmt.start()
lgg = LevelDBGraphGenerator(db)
app.run(debug=True, use_reloader=False, threaded=True)
